From b7df222032d7012801264b5c55035587dcb8ee9f Mon Sep 17 00:00:00 2001 From: "mjw@wray-m-3.hpl.hp.com" Date: Fri, 11 Mar 2005 17:01:00 +0000 Subject: [PATCH] bitkeeper revision 1.1159.269.2 (4231ceccPlcoLKWWixfu3trU9tb-_A) Improve error reporting for save/restore/migrate. Signed-off-by: Mike Wray --- tools/python/xen/xend/XendMigrate.py | 158 +++++++++++++--------- tools/python/xen/xend/server/SrvDomain.py | 17 --- 2 files changed, 91 insertions(+), 84 deletions(-) diff --git a/tools/python/xen/xend/XendMigrate.py b/tools/python/xen/xend/XendMigrate.py index a1c261deb5..f2570ae02b 100644 --- a/tools/python/xen/xend/XendMigrate.py +++ b/tools/python/xen/xend/XendMigrate.py @@ -6,12 +6,14 @@ import errno import sys import socket import time +import types from twisted.internet import reactor from twisted.internet import defer #defer.Deferred.debug = 1 from twisted.internet.protocol import Protocol from twisted.internet.protocol import ClientFactory +from twisted.python.failure import Failure import sxp import XendDB @@ -45,11 +47,9 @@ class Xfrd(Protocol): sxp.show(req, out=self.transport) def loseConnection(self): - print 'Xfrd>loseConnection>' self.transport.loseConnection() def connectionLost(self, reason): - print 'Xfrd>connectionLost>', reason self.xinfo.connectionLost(reason) def dataReceived(self, data): @@ -70,17 +70,15 @@ class XfrdClientFactory(ClientFactory): self.xinfo = xinfo def startedConnecting(self, connector): - print 'Started to connect', 'self=', self, 'connector=', connector + pass def buildProtocol(self, addr): - print 'buildProtocol>', addr return Xfrd(self.xinfo) def clientConnectionLost(self, connector, reason): - print 'clientConnectionLost>', 'connector=', connector, 'reason=', reason + pass def clientConnectionFailed(self, connector, reason): - print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason self.xinfo.error(reason) class XfrdInfo: @@ -90,7 +88,7 @@ class XfrdInfo: """Suspend timeout (seconds). We set a timeout because suspending a domain can hang.""" - timeout = 30 + timeout = 10 def __init__(self): from xen.xend import XendDomain @@ -98,6 +96,9 @@ class XfrdInfo: self.deferred = defer.Deferred() self.suspended = {} self.paused = {} + self.state = 'init' + # List of errors encountered. + self.errors = [] def vmconfig(self): dominfo = self.xd.domain_get(self.src_dom) @@ -107,12 +108,38 @@ class XfrdInfo: val = None return val + def add_error(self, err): + """Add an error to the error list. + Returns the error added (which may have been unwrapped if it + was a Twisted Failure). + """ + while isinstance(err, Failure): + err = err.value + if err not in self.errors: + self.errors.append(err) + return err + + def error_summary(self, msg=None): + """Get a XendError summarising the errors (if any). + """ + if msg is None: + msg = "errors" + if self.errors: + errmsg = msg + ': ' + ', '.join(map(str, self.errors)) + else: + errmsg = msg + return XendError(errmsg) + + def get_errors(self): + """Get the list of errors. + """ + return self.errors + def error(self, err): - print 'Error>', err self.state = 'error' + self.add_error(err) if not self.deferred.called: - print 'Error> calling errback' - self.deferred.errback(err) + self.deferred.errback(self.error_summary()) def dispatch(self, xfrd, val): @@ -139,28 +166,23 @@ class XfrdInfo: cbok(val) def unknown(self, xfrd, val): - print 'unknown>', val xfrd.loseConnection() return None def xfr_err(self, xfrd, val): # If we get an error with non-zero code the operation failed. # An error with code zero indicates hello success. - print 'xfr_err>', val v = sxp.child0(val) - print 'xfr_err>', type(v), v err = int(sxp.child0(val)) if not err: return - self.error(err); + self.error("transfer daemon (xfrd) error: " + str(err)) xfrd.loseConnection() return None def xfr_progress(self, xfrd, val): - print 'xfr_progress>', val return None def xfr_vm_destroy(self, xfrd, val): - print 'xfr_vm_destroy>', val try: vmid = sxp.child0(val) val = self.xd.domain_destroy(vmid) @@ -168,28 +190,32 @@ class XfrdInfo: del self.paused[vmid] if vmid in self.suspended: del self.suspended[vmid] - except: + except StandardError, err: + self.add_error("vm_destroy failed") + self.add_error(err) val = errno.EINVAL return ['xfr.err', val] def xfr_vm_pause(self, xfrd, val): - print 'xfr_vm_pause>', val try: vmid = sxp.child0(val) val = self.xd.domain_pause(vmid) self.paused[vmid] = 1 - except: + except StandardError, err: + self.add_error("vm_pause failed") + self.add_error(err) val = errno.EINVAL return ['xfr.err', val] def xfr_vm_unpause(self, xfrd, val): - print 'xfr_vm_unpause>', val try: vmid = sxp.child0(val) val = self.xd.domain_unpause(vmid) if vmid in self.paused: del self.paused[vmid] - except: + except StandardError, err: + self.add_error("vm_unpause failed") + self.add_error(err) val = errno.EINVAL return ['xfr.err', val] @@ -199,7 +225,6 @@ class XfrdInfo: Suspending can hang, so we set a timeout and fail if it takes too long. """ - print 'xfr_vm_suspend>', val try: vmid = sxp.child0(val) d = defer.Deferred() @@ -208,15 +233,15 @@ class XfrdInfo: # the domain died. Set a timeout and error handler so the subscriptions # will be cleaned up if suspending hangs or there is an error. def onSuspended(e, v): - print 'xfr_vm_suspend>onSuspended>', e, v if v[1] != vmid: return subscribe(on=0) - d.callback(v) + if not d.called: + d.callback(v) def onDied(e, v): - print 'xfr_vm_suspend>onDied>', e, v if v[1] != vmid: return - d.errback(XendError('Domain died')) + if not d.called: + d.errback(XendError('Domain %s died while suspending' % vmid)) def subscribe(on=1): if on: @@ -227,24 +252,25 @@ class XfrdInfo: action('xend.domain.died', onDied) def cberr(err): - print 'xfr_vm_suspend>cberr>', err subscribe(on=0) + self.add_error("suspend failed") + self.add_error(err) return err + d.addErrback(cberr) + d.setTimeout(self.timeout) subscribe() val = self.xd.domain_shutdown(vmid, reason='suspend') self.suspended[vmid] = 1 - d.addErrback(cberr) - d.setTimeout(self.timeout) return d except Exception, err: - print 'xfr_vm_suspend> Exception', err + self.add_error("suspend failed") + self.add_error(err) traceback.print_exc() val = errno.EINVAL return ['xfr.err', val] def connectionLost(self, reason=None): - print 'XfrdInfo>connectionLost>', reason for vmid in self.suspended: try: self.xd.domain_destroy(vmid) @@ -279,7 +305,7 @@ class XendMigrateInfo(XfrdInfo): ['id', self.xid ], ['state', self.state ], ['live', self.live ], - ['resource', self.resource] ] + ['resource', self.resource ] ] sxpr_src = ['src', ['host', self.src_host], ['domain', self.src_dom] ] sxpr.append(sxpr_src) sxpr_dst = ['dst', ['host', self.dst_host] ] @@ -291,12 +317,12 @@ class XendMigrateInfo(XfrdInfo): def request(self, xfrd): vmconfig = self.vmconfig() if not vmconfig: + self.error(XendError("vm config not found")) xfrd.loseConnection() return - log.info('Migrate BEGIN: ' + str(self.sxpr())) + log.info('Migrate BEGIN: %s' % str(self.sxpr())) eserver.inject('xend.domain.migrate', - [ self.dominfo.name, self.dominfo.id, - "begin", self.sxpr() ]) + [ self.dominfo.name, self.dominfo.id, "begin", self.sxpr() ]) xfrd.request(['xfr.migrate', self.src_dom, vmconfig, @@ -305,19 +331,6 @@ class XendMigrateInfo(XfrdInfo): self.live, self.resource ]) -## def xfr_vm_suspend(self, xfrd, val): -## def cbok(val): -## # Special case for localhost: destroy devices early. -## if self.dst_host in ["localhost", "127.0.0.1"]: -## self.dominfo.restart_cancel() -## self.dominfo.cleanup() -## self.dominfo.destroy_console() -## return val - -## d = XfrdInfo.xfr_vm_suspend(self, xfrd, val) -## d.addCallback(cbok) -## return d - def xfr_migrate_ok(self, xfrd, val): dom = int(sxp.child0(val)) self.state = 'ok' @@ -327,17 +340,15 @@ class XendMigrateInfo(XfrdInfo): self.deferred.callback(self) def connectionLost(self, reason=None): - print 'XfrdMigrateInfo>connectionLost>', reason XfrdInfo.connectionLost(self, reason) if self.state =='ok': log.info('Migrate OK: ' + str(self.sxpr())) else: self.state = 'error' - self.error(XendError("migrate failed")) + self.error("migrate failed") log.info('Migrate ERROR: ' + str(self.sxpr())) eserver.inject('xend.domain.migrate', - [ self.dominfo.name, self.dominfo.id, - self.state, self.sxpr() ]) + [ self.dominfo.name, self.dominfo.id, self.state, self.sxpr() ]) class XendSaveInfo(XfrdInfo): """Representation of a save in-progress and its interaction with xfrd. @@ -361,16 +372,15 @@ class XendSaveInfo(XfrdInfo): return sxpr def request(self, xfrd): - print '***request>', self.vmconfig() vmconfig = self.vmconfig() if not vmconfig: + self.error(XendError("vm config not found")) xfrd.loseConnection() return - print '***request> begin' log.info('Save BEGIN: ' + str(self.sxpr())) eserver.inject('xend.domain.save', - [self.dominfo.name, self.dominfo.id, - "begin", self.sxpr()]) + [ self.dominfo.name, self.dominfo.id, + "begin", self.sxpr() ]) xfrd.request(['xfr.save', self.src_dom, vmconfig, self.file ]) def xfr_save_ok(self, xfrd, val): @@ -380,13 +390,12 @@ class XendSaveInfo(XfrdInfo): self.deferred.callback(self) def connectionLost(self, reason=None): - print 'XfrdSaveInfo>connectionLost>', reason XfrdInfo.connectionLost(self, reason) if self.state =='ok': log.info('Save OK: ' + str(self.sxpr())) else: self.state = 'error' - self.error(XendError("save failed")) + self.error("save failed") log.info('Save ERROR: ' + str(self.sxpr())) eserver.inject('xend.domain.save', [ self.dominfo.name, self.dominfo.id, @@ -409,8 +418,9 @@ class XendRestoreInfo(XfrdInfo): return sxpr def request(self, xfrd): - print '***request>', self.file log.info('restore BEGIN: ' + str(self.sxpr())) + eserver.inject('xend.restore', [ 'begin', self.sxpr()]) + xfrd.request(['xfr.restore', self.file ]) def xfr_restore_ok(self, xfrd, val): @@ -419,8 +429,17 @@ class XendRestoreInfo(XfrdInfo): self.state = 'ok' if not self.deferred.called: self.deferred.callback(dominfo) - + def connectionLost(self, reason=None): + XfrdInfo.connectionLost(self, reason) + if self.state =='ok': + log.info('Restore OK: ' + self.file) + else: + self.state = 'error' + self.error("restore failed") + log.info('Restore ERROR: ' + str(self.sxpr())) + eserver.inject('xend.restore', [ self.state, self.sxpr()]) + class XendMigrate: """External api for interaction with xfrd for migrate and save. Singleton. @@ -445,7 +464,6 @@ class XendMigrate: self.db.saveall("", self.session_db) def sync_session(self, xid): - print 'sync_session>', type(xid), xid, self.session_db[xid] self.db.save(xid, self.session_db[xid]) def close(self): @@ -458,7 +476,6 @@ class XendMigrate: self.sync_session(xid) def _delete_session(self, xid): - print '***_delete_session>', xid if xid in self.session: del self.session[xid] if xid in self.session_db: @@ -482,16 +499,23 @@ class XendMigrate: @param info: session @return: deferred """ - def cbremove(val): - print '***cbremove>', val + dfr = defer.Deferred() + def cbok(val): self._delete_session(info.xid) + if not dfr.called: + dfr.callback(val) return val + def cberr(err): + self._delete_session(info.xid) + if not dfr.called: + dfr.errback(err) + return err self._add_session(info) - info.deferred.addCallback(cbremove) - info.deferred.addErrback(cbremove) + info.deferred.addCallback(cbok) + info.deferred.addErrback(cberr) xcf = XfrdClientFactory(info) reactor.connectTCP('localhost', XFRD_PORT, xcf) - return info.deferred + return dfr def migrate_begin(self, dominfo, host, port=XFRD_PORT, live=0, resource=0): """Begin to migrate a domain to another host. diff --git a/tools/python/xen/xend/server/SrvDomain.py b/tools/python/xen/xend/server/SrvDomain.py index d73a39bff5..a4c06f6c8d 100644 --- a/tools/python/xen/xend/server/SrvDomain.py +++ b/tools/python/xen/xend/server/SrvDomain.py @@ -29,13 +29,8 @@ class SrvDomain(SrvDir): [['dom', 'int'], ['config', 'sxpr']]) deferred = fn(req.args, {'dom': self.dom.dom}) - deferred.addErrback(self._op_configure_err, req) return deferred - def _op_configure_err(self, err, req): - req.setResponseCode(http.BAD_REQUEST, "Error: "+ str(err)) - return str(err) - def op_unpause(self, op, req): val = self.xd.domain_unpause(self.dom.name) return val @@ -68,16 +63,11 @@ class SrvDomain(SrvDir): ['file', 'str']]) deferred = fn(req.args, {'dom': self.dom.id}) deferred.addCallback(self._op_save_cb, req) - deferred.addErrback(self._op_save_err, req) return deferred def _op_save_cb(self, val, req): return 0 - def _op_save_err(self, err, req): - req.setResponseCode(http.BAD_REQUEST, "Error: "+ str(err)) - return str(err) - def op_migrate(self, op, req): fn = FormFn(self.xd.domain_migrate, [['dom', 'str'], @@ -85,9 +75,7 @@ class SrvDomain(SrvDir): ['live', 'int'], ['resource', 'int']]) deferred = fn(req.args, {'dom': self.dom.id}) - print 'op_migrate>', deferred deferred.addCallback(self._op_migrate_cb, req) - deferred.addErrback(self._op_migrate_err, req) return deferred def _op_migrate_cb(self, info, req): @@ -101,11 +89,6 @@ class SrvDomain(SrvDir): print '_op_migrate_cb> url=', url return url - def _op_migrate_err(self, err, req): - print '_op_migrate_err>', err, req - req.setResponseCode(http.BAD_REQUEST, "Error: "+ str(err)) - return str(err) - def op_pincpu(self, op, req): fn = FormFn(self.xd.domain_pincpu, [['dom', 'str'], -- 2.30.2